package com.mimo.comet.ws.endpoint;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.util.Assert;
import org.yeauty.annotation.BeforeHandshake;
import org.yeauty.annotation.OnBinary;
import org.yeauty.annotation.OnClose;
import org.yeauty.annotation.OnError;
import org.yeauty.annotation.OnEvent;
import org.yeauty.annotation.OnMessage;
import org.yeauty.annotation.OnOpen;
import org.yeauty.annotation.RequestParam;
import org.yeauty.annotation.ServerEndpoint;
import org.yeauty.pojo.Session;

import com.mimo.comet.config.LocalCometServerConfig;
import com.mimo.comet.event.listener.SessionCreatedEvent;
import com.mimo.comet.provider.DefaultCometHeartBeatPlugIn;
import com.mimo.comet.provider.ICommandRouter;
import com.mimo.comet.provider.ISession;
import com.mimo.comet.provider.command.BaseCommand;
import com.mimo.comet.provider.command.echo.ErrorEchoCommand;
import com.mimo.comet.provider.command.echo.LoginSuccessEchoCommand;
import com.mimo.comet.provider.plugin.IHeartBeatPlugIn;
import com.mimo.comet.user.service.IUserService;
import com.mimo.common.comet.dto.user.DeviceDTO;
import com.mimo.common.comet.dto.user.UserLoginReq;
import com.mimo.common.http.header.HttpHeaderParams;
import com.mimo.common.listener.IMessage;
import com.mimo.common.logic.code.StatusCode;
import com.mimo.common.utils.JsonUtils;

import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.timeout.IdleStateEvent;

@EnableConfigurationProperties(value = { LocalCometServerConfig.class })
@ServerEndpoint(path = "/ws/connect",
    //
    maxFramePayloadLength = "196608", // 业务上需要支持每个command的content字段长达128KB的大小，故，此处需要保证socket在收接包时的最大值必须大于128KB，目前调整上限至192KB报文长度
    port = "${ws.port}",
    //
    optionSoBacklog = "2000", // 考虑到业务场景可能存在，突然有大量的用户涌入并连接，而服务端对于accept的处理会出现阻塞列队过短，故，调整等待队列至2000
    allIdleTimeSeconds = DefaultCometHeartBeatPlugIn.EXPIRED_TIMEOUT)
public class WebSocketServer {
  private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);

  /**
   * 临时本地测试使用,如果发布测试或者生产环境,则需要向运维小伙伴沟通好以获取用户IP真实地址的信息
   *
   * @deprecated
   */
  @Deprecated
  private static final String FADE_CLIENT_IP = "127.0.0.1";

  private static final Set<String> TERMINALS = new HashSet<>(Arrays.asList("ANDROID", "IOS", "WEB")); // 允许的接入设备

  private static final String SESSION_KEY = "USER_ID";

  private static final String AUTH_KEY = "AUTH_FLAG";

  private static final String TERMINAL_TYPE = "TERMINAL_TYPE";

  @Autowired
  private LocalCometServerConfig localCometServerConfig;

  @Autowired
  private IHeartBeatPlugIn heartBeatPlugIn;

  @Autowired
  private IUserService userService;

  @Autowired
  private IMessage cometSessionListener;

  @Autowired
  private ICommandRouter commandRouter;

  @BeforeHandshake
  public void handshake(Session session, HttpHeaders headers, @RequestParam String uid, @RequestParam String token,
      @RequestParam String deviceId, @RequestParam String terminal, @RequestParam String version,
      @RequestParam String os, @RequestParam String brand, @RequestParam String model,
      @RequestParam String appVersion) {

    StatusCode code = StatusCode.Error; // 考虑到系统之间的服务登陆时，有可能出现异常，此时，应该返回客户端属于服务器异常，并允许客户端一直做重试

    String clientIp = Optional.ofNullable(headers.get(HttpHeaderParams.X_FORWARDED_FOR)).orElse(FADE_CLIENT_IP);

    // 打印连接请求，并做基本的非空判断
    log.info(
        "Login Request-> uid:{}, token:{},deviceId:{},terminal:{},version:{},client ip:{},os:{},brand:{},model:{},appVersion:{}",
        uid, token, deviceId, terminal, version, clientIp, os, brand, model, appVersion);
    Objects.requireNonNull(uid, "uid");
    Objects.requireNonNull(token, "token");
    Objects.requireNonNull(deviceId, "deviceId");
    Objects.requireNonNull(terminal, "terminal");
    Objects.requireNonNull(version, "version");
    Assert.isTrue(TERMINALS.contains(terminal), String.format("输入的terminal:[%s]非法", terminal));

    // 不论结果如何，先绑定
    session.setAttribute(SESSION_KEY, uid);
    session.setAttribute(AUTH_KEY, code);
    session.setAttribute(TERMINAL_TYPE, terminal);

    // 默认支持文本通讯协议
    session.setSubprotocols("stomp");

    UserLoginReq loginReq = new UserLoginReq(uid, token, localCometServerConfig.getZone(), clientIp, deviceId, terminal,
        version, new DeviceDTO(terminal, os, brand, model, appVersion));
    code = userService.login(loginReq);

    // 对结果做复写
    session.setAttribute(AUTH_KEY, code);

    if (code.isSuccess()) {
      // 检查本地的session,存在就移除。
      userService.unbinding(uid, null, deviceId);

      // 通知集群创建了新的会话，其他实例需要对其进行中断和踢出
      SessionCreatedEvent evt = new SessionCreatedEvent(localCometServerConfig.getApplicationId(), uid,
          localCometServerConfig.getZone(), deviceId);
      cometSessionListener.send(evt);
    } else {
      log.error("登陆验证失败, 握手阶段返回-> uid:{}, token:{},deviceId:{},terminal:{},version:{},client ip:{}, code:{}", uid,
          token, deviceId, terminal, version, clientIp, code);
    }
  }

  @OnOpen
  public void onOpen(Session session, HttpHeaders headers, @RequestParam String uid, @RequestParam String deviceId,
      @RequestParam String terminal, @RequestParam String version) {
    Assert.isTrue(Objects.equals(uid, session.getAttribute(SESSION_KEY)), "新建连接打开处理有误");
    StatusCode code = session.getAttribute(AUTH_KEY);
    if (!Objects.equals(code, StatusCode.Success)) {
      ErrorEchoCommand echo = new ErrorEchoCommand();
      echo.setCode(code.getCode());
      echo.setMsg(code.getMsg());
      send(session, JsonUtils.toJsonString(echo));
      session.close();
      return;
    }

    // 添加会话绑定的设备上下文
    ISession sc = new WebSocketChannel(session, uid, terminal);
    sc.addAttribute("deviceId", deviceId);

    if (userService.binding(uid, sc)) {
      sc.write(JsonUtils.toJsonString(new LoginSuccessEchoCommand()));

      // TODO zehui remove 版本兼容：客户端从1.2.0(包含)版本开始，将主动检索离线消息。小于该版本的需要由服务端主动推送
      if (!Objects.equals(terminal, "WEB")) {
        final long watershed = 10002000L;
        String[] codes = version.split("\\.");
        if (Long.valueOf(codes[0]) * 1000 * 1000 + Long.valueOf(codes[1]) * 1000 + Long.valueOf(codes[2]) < watershed) {
          userService.getPendingMessage(uid).forEach(msg -> sc.write(JsonUtils.toJsonString(msg)));
        }
      }
    } else {
      log.warn("会话绑定前，此前用户[{}]会话有残留, 关闭此次绑定动作", uid);
      session.setAttribute(AUTH_KEY, StatusCode.Error); // 标记此次登陆为不成功,并认定为异常，以便于客户端重试
      ErrorEchoCommand echo = new ErrorEchoCommand();
      echo.setCode(StatusCode.Error.getCode());
      echo.setMsg(StatusCode.Error.getMsg());
      send(session, JsonUtils.toJsonString(echo));
      sc.close();
    }

  }

  @OnClose
  public void onClose(Session session) {
    String userId = session.getAttribute(SESSION_KEY);
    if (Objects.nonNull(userId) && !Objects.equals(session.getAttribute(AUTH_KEY), StatusCode.Success)) {
      log.warn("打印非认证下的用户[{}]信息", userId);
    }

    if (Objects.nonNull(userId) && Objects.equals(session.getAttribute(AUTH_KEY), StatusCode.Success)) { // 之所以添加一个认证标记判断，是业务逻辑上，只有认证成功才会去做绑定
      userService.unbinding(userId, null);
    }
  }

  @OnError
  public void onError(Session session, Throwable throwable) {
    String userId = session.getAttribute(SESSION_KEY);
    log.error(String.format("用户:[%s]异常中断事件", userId), throwable);

    if (Objects.nonNull(userId) && Objects.equals(session.getAttribute(AUTH_KEY), StatusCode.Success)) {// 之所以添加一个认证标记判断，是业务逻辑上，只有认证成功才会去做绑定
      try {
        userService.unbinding(userId, null);
      } catch (Exception e) {
        log.error("userId{}解绑失败", userId, e);
      } finally {
        session.close();
      }
    }
  }

  @OnBinary
  public void onBinary(Session session, byte[] bytes) {
    String message = new String(bytes, StandardCharsets.UTF_8);
    this.onMessage(session, message);
  }

  @OnMessage
  public void onMessage(Session session, String message) {
    String userId = session.getAttribute(SESSION_KEY);
    if (Objects.nonNull(userId)) {
      BaseCommand cmd = JsonUtils.parseJson(message, BaseCommand.class);
      commandRouter.route(userId, cmd, session);
    }
  }

  @OnEvent
  public void onEvent(Session session, Object evt) {
    if (evt instanceof IdleStateEvent) {
      String userId = session.getAttribute(SESSION_KEY);
      IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
      switch (idleStateEvent.state()) {
        case READER_IDLE:
          break;
        case WRITER_IDLE:
          break;
        case ALL_IDLE:
          if (Objects.nonNull(userId)) {// IDLE 回调时通知心跳插件自行去维护
            heartBeatPlugIn.onExpired(userId, session);
          }
          break;
        default:
          break;
      }

    }
  }

  /**
   * 如果是H5的，则直接用二进制回写
   *
   * @param session
   * @param msg
   */
  private void send(Session session, String msg) {
    if (Objects.equals(session.getAttribute(TERMINAL_TYPE), "WEB")) {
      session.sendBinary(msg.getBytes(StandardCharsets.UTF_8));
    } else {
      session.sendText(msg);
    }
  }

}